{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "%run startup.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "application/javascript": [
       "$.getScript('./assets/js/ipython_notebook_toc.js')"
      ],
      "text/plain": [
       "<IPython.core.display.Javascript object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "%%javascript\n",
    "$.getScript('./assets/js/ipython_notebook_toc.js')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# A Decision Tree of Observable Operators\n",
    "\n",
    "## Part 4: Grouping, Buffering, Delaying, misc\n",
    "\n",
    "> source: http://reactivex.io/documentation/operators.html#tree.  \n",
    "> (transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, [axiros](http://www.axiros.com))  \n",
    "\n",
    "**This tree can help you find the ReactiveX Observable operator you’re looking for.**  \n",
    "See [Part 1](./A Decision Tree of Observable Operators. Part I - Creation.ipynb) for Usage and Output Instructions.  \n",
    "\n",
    "We also require acquaintance with the [marble diagrams](./Marble Diagrams.ipynb) feature of RxPy.\n",
    "\n",
    "<h2 id=\"tocheading\">Table of Contents</h2>\n",
    "<div id=\"toc\"></div>\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want to shift the items emitted by an Observable forward in time before reemitting them"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... **[delay](http://reactivex.io/documentation/operators/delay.html) **"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== delay ==========\n",
      "\n",
      "module rx.linq.observable.delay\n",
      "@extensionmethod(Observable)\n",
      "def delay(self, duetime, scheduler=None):\n",
      "    Time shifts the observable sequence by duetime. The relative time\n",
      "    intervals between the values are preserved.\n",
      "\n",
      "    1 - res = rx.Observable.delay(datetime())\n",
      "    2 - res = rx.Observable.delay(datetime(), Scheduler.timeout)\n",
      "\n",
      "    3 - res = rx.Observable.delay(5000)\n",
      "    4 - res = rx.Observable.delay(5000, Scheduler.timeout)\n",
      "\n",
      "    Keyword arguments:\n",
      "    :param datetime|int duetime: Absolute (specified as a datetime object) or\n",
      "        relative time (specified as an integer denoting milliseconds) by which\n",
      "        to shift the observable sequence.\n",
      "    :param Scheduler scheduler: [Optional] Scheduler to run the delay timers on.\n",
      "        If not specified, the timeout scheduler is used.\n",
      "\n",
      "    :returns: Time-shifted sequence.\n",
      "    :rtype: Observable\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   3.0     M New subscription on stream 276458585\n",
      "  14.7    T4 [next]   11.7: 1\n",
      " 125.8    T5 [next]  122.8: 2\n",
      " 169.0   T20 [next]  165.9: a\n",
      " 235.4    T7 [next]  232.4: 3\n",
      " 279.2   T21 [next]  276.2: b\n",
      " 392.2   T22 [next]  389.2: c\n",
      " 402.1   T23 [cmpl]  399.1: fin\n"
     ]
    }
   ],
   "source": [
    "reset_start_time(O.delay)\n",
    "d = subs(marble_stream('a-b-c|').delay(150).merge(marble_stream('1-2-3|')))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want to transform items and notifications from an Observable into items and reemit them"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... by wrapping them in Notification objects **[materialize](http://reactivex.io/documentation/operators/materialize-dematerialize.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function materialize of module rx.linq.observable.materialize:\n",
      "Materializes the implicit notifications of an observable sequence as\n",
      "    explicit notification values.\n",
      "\n",
      "    Returns an observable sequence containing the materialized notification\n",
      "    values from the source sequence.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.8     M New subscription on stream 277716117\n",
      "   1.4     M [next]    0.5: kind: N, value: 1\n",
      "   1.8     M [next]    0.9: kind: N, value: 2\n",
      "   2.3     M [next]    1.3: kind: N, value: 3\n",
      "   2.5     M [next]    1.5: kind: C, value: None\n",
      "   2.5     M [cmpl]    1.5: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.materialize)\n",
    "def pretty(notif):\n",
    "    # this are the interesting attributes:\n",
    "    return 'kind: %(kind)s, value: %(value)s' % ItemGetter(notif)\n",
    "d = subs(O.from_((1, 2, 3)).materialize().map(lambda x: pretty(x)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### ... which I can then unwrap again with **[dematerialize](http://reactivex.io/documentation/operators/materialize-dematerialize.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function dematerialize of module rx.linq.observable.dematerialize:\n",
      "Dematerializes the explicit notification values of an observable\n",
      "    sequence as implicit notifications.\n",
      "\n",
      "    Returns an observable sequence exhibiting the behavior corresponding to\n",
      "    the source sequence's notification values.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.8     M New subscription on stream 276437645\n",
      "   1.4     M [next]    0.5: 1\n",
      "   1.8     M [next]    0.8: 2\n",
      "   2.3     M [next]    1.4: 3\n",
      "   2.6     M [cmpl]    1.7: fin\n",
      "\n",
      "\n",
      "========== Dematerializing manually created notifs ==========\n",
      "\n",
      "\n",
      "   3.3     M New subscription on stream 276437669\n",
      "   3.6     M [next]    0.3: foo\n",
      "   3.9     M [cmpl]    0.5: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.dematerialize)\n",
    "d = subs(O.from_((1, 2, 3)).materialize().dematerialize())\n",
    "\n",
    "header('Dematerializing manually created notifs')\n",
    "d = subs(O.from_((rx.core.notification.OnNext('foo'), rx.core.notification.OnCompleted())).dematerialize())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "898987.2     M New subscription on stream 276437705\n",
      "{mydump}-->{OnNext(1)}\n",
      "898987.9     M [next]    0.7: OnNext(1)\n",
      "{mydump}-->{OnNext(2)}\n",
      "898988.5     M [next]    1.3: OnNext(2)\n",
      "{mydump}-->{OnNext(3)}\n",
      "898989.0     M [next]    1.7: OnNext(3)\n",
      "{mydump}-->{OnCompleted()}\n",
      "898989.5     M [next]    2.2: OnCompleted()\n",
      "{mydump} completed\n",
      "898989.8     M [cmpl]    2.5: fin\n"
     ]
    }
   ],
   "source": [
    "# Materializing a sequence can be very handy for performing analysis or logging of a sequence.\n",
    "# You can unwrap a materialized sequence by applying the Dematerialize extension method. \n",
    "from rx.testing import dump\n",
    "d = subs(O.range(1, 3).materialize().dump(name='mydump'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want to ignore all items emitted by an Observable and only pass along its completed/error notification"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... **[ignore_elements](http://reactivex.io/documentation/operators/ignoreelements.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function ignore_elements of module rx.linq.observable.ignoreelements:\n",
      "Ignores all elements in an observable sequence leaving only the\n",
      "    termination messages.\n",
      "\n",
      "    Returns an empty observable {Observable} sequence that signals\n",
      "    termination, successful or exceptional, of the source sequence.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.5     M New subscription on stream 277710309\n",
      "   2.2     M [cmpl]    1.6: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.ignore_elements)\n",
    "d = subs(O.range(0, 10).ignore_elements())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want to mirror an Observable but prefix items to its sequence **[start_with](http://reactivex.io/documentation/operators/startwith.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function start_with of module rx.linq.observable.startswith:\n",
      "Prepends a sequence of values to an observable sequence with an\n",
      "    optional scheduler and an argument list of values to prepend.\n",
      "\n",
      "    1 - source.start_with(1, 2, 3)\n",
      "    2 - source.start_with(Scheduler.timeout, 1, 2, 3)\n",
      "\n",
      "    Returns the source sequence prepended with the specified values.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.7     M New subscription on stream 276437529\n",
      "   2.3     M [next]    0.5: 1\n",
      "   2.5     M [next]    0.7: 2\n",
      "   2.8     M [next]    0.9: 3\n",
      "   3.4     M [next]    1.5: a\n",
      "   3.6     M [next]    1.8: b\n",
      "   3.9     M [cmpl]    2.0: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.start_with)\n",
    "d = subs(O.from_(('a', 'b')).start_with(1, 2, 3))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... only if its sequence is empty **[default_if_empty](http://reactivex.io/documentation/operators/defaultifempty.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function default_if_empty of module rx.linq.observable.defaultifempty:\n",
      "Returns the elements of the specified sequence or the specified value\n",
      "    in a singleton sequence if the sequence is empty.\n",
      "\n",
      "    res = obs = xs.defaultIfEmpty()\n",
      "    obs = xs.defaultIfEmpty(False\n",
      "\n",
      "    Keyword arguments:\n",
      "    default_value -- The value to return if the sequence is empty. If not\n",
      "        provided, this defaults to None.\n",
      "\n",
      "    Returns an observable {Observable} sequence that contains the specified\n",
      "    default value if the source is empty otherwise, the elements of the\n",
      "    source itself.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.8     M New subscription on stream 276444765\n",
      "   1.4     M [next]    0.4: None\n",
      "   1.5     M [cmpl]    0.5: fin\n",
      "\n",
      "   2.5     M New subscription on stream 276444761\n",
      "   3.0     M [next]    0.2: hello world\n",
      "   3.3     M [cmpl]    0.5: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.default_if_empty)\n",
    "# the default here is to emit a None:\n",
    "d = subs(O.empty().default_if_empty())\n",
    "d = subs(O.empty().default_if_empty('hello world'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want to collect items from an Observable and reemit them as buffers of items **[buffer](http://reactivex.io/documentation/operators/buffer.html)**\n",
    "\n",
    "Very good intro is [here](http://xgrommx.github.io/rx-book/content/observable/observable_instance_methods/buffer.html)  \n",
    "Buffer 'closing' means: The buffer is flushed to the subscriber(s), then next buffer is getting filled.\n",
    "\n",
    "Note: The used scheduler seems not 100% exact timewise on the marble streams. But you get the idea."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function buffer of module rx.linq.observable.buffer:\n",
      "Projects each element of an observable sequence into zero or more\n",
      "    buffers.\n",
      "\n",
      "    Keyword arguments:\n",
      "    buffer_openings -- Observable sequence whose elements denote the\n",
      "        creation of windows.\n",
      "    closing_selector -- Or, a function invoked to define the boundaries of\n",
      "        the produced windows (a window is started when the previous one is\n",
      "        closed, resulting in non-overlapping windows).\n",
      "    buffer_closing_selector -- [optional] A function invoked to define the\n",
      "        closing of each produced window. If a closing selector function is\n",
      "        specified for the first parameter, this parameter is ignored.\n",
      "\n",
      "    Returns an observable sequence of windows.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "\n",
      "========== with closing selector ==========\n",
      "\n",
      "Returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a new buffer whenever the Observable produced by the specified bufferClosingSelector emits an item.\n",
      "\n",
      "Calling the closer as is:\n",
      "\n",
      "   1.4     M New subscription on stream 276484633\n",
      " 323.4  T319 [next]  322.1: ['1', '2', '3']\n",
      " 533.3  T321 [next]  531.9: ['4', '5']\n",
      " 910.1  T317 [next]  908.8: ['6', '7', '8', '9']\n",
      " 911.0  T317 [cmpl]  909.6: fin\n",
      "\n",
      "Calling again and again -> equal buffer sizes flushed\n",
      "\n",
      "2010.0     M New subscription on stream 276473777\n",
      "2334.4  T346 [next]  324.4: ['1', '2', '3']\n",
      "2649.7  T350 [next]  639.7: ['4', '5', '6']\n",
      "2921.9  T343 [next]  911.9: ['7', '8', '9']\n",
      "2922.6  T343 [cmpl]  912.6: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.buffer)\n",
    "\n",
    "header('with closing selector')\n",
    "# the simplest one:\n",
    "print('''Returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a new buffer whenever the Observable produced by the specified bufferClosingSelector emits an item.''')\n",
    "xs = marble_stream('1-2-3-4-5-6-7-8-9|')\n",
    "# defining when to flush the buffer to the subscribers:\n",
    "cs = marble_stream('---e--e----------|')\n",
    "print('\\nCalling the closer as is:')\n",
    "d = subs(xs.buffer(closing_selector=cs))\n",
    "sleep(2)\n",
    "print('\\nCalling again and again -> equal buffer sizes flushed')\n",
    "cs = marble_stream('---e|')\n",
    "d = subs(xs.buffer(closing_selector=lambda: cs))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== with buffer closing selector ==========\n",
      "\n",
      "Subscribing two times with different buffer sizes\n",
      "\n",
      "   1.3     M New subscription on stream 277176113\n",
      "\n",
      "  12.6     M New subscription on stream 281768273\n",
      " 234.6 T1826 [next]  221.8: ['1', '2'] (small bufs)\n",
      " 452.4 T1830 [next]  439.6: ['3', '4'] (small bufs)\n",
      " 624.7 T1802 [next]  623.3: ['1', '2', '3', '4', '5', '6'] (BIIIIIG bufs)\n",
      " 668.6 T1834 [next]  655.8: ['5', '6'] (small bufs)\n",
      " 883.7 T1842 [next]  871.0: ['7', '8'] (small bufs)\n",
      " 911.4 T1800 [next]  910.1: ['7', '8', '9'] (BIIIIIG bufs)\n",
      " 912.1 T1800 [cmpl]  910.7: fin (BIIIIIG bufs)\n",
      " 920.1 T1823 [next]  907.3: ['9'] (small bufs)\n",
      " 920.7 T1823 [cmpl]  907.9: fin (small bufs)\n"
     ]
    }
   ],
   "source": [
    "rst(title='with buffer closing selector')\n",
    "\n",
    "xs = marble_stream('1-2-3-4-5-6-7-8-9|')\n",
    "# the more '-' the bigger the emitted buffers.\n",
    "# Called again and again:\n",
    "cs  = marble_stream('------e|')\n",
    "cs2 = marble_stream('--e|')\n",
    "print ('Subscribing two times with different buffer sizes')\n",
    "d = subs(xs.buffer(buffer_closing_selector=lambda: cs), name='BIIIIIG bufs')\n",
    "d = subs(xs.buffer(buffer_closing_selector=lambda: cs2),name='small bufs')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== with buffer opening selector ==========\n",
      "\n",
      "\n",
      "   0.8     M New subscription on stream 276484729\n",
      " 323.2 T1769 [next]  322.3: ['1', '2', '3']\n",
      " 642.0 T1774 [next]  641.2: ['4', '5', '6']\n",
      " 911.3 T1768 [next]  910.4: ['7', '8', '9']\n",
      " 915.0 T1768 [cmpl]  914.2: fin\n"
     ]
    }
   ],
   "source": [
    "rst(title='with buffer opening selector')\n",
    "\n",
    "xs =     marble_stream('1-2-3-4-5-6-7-8-9|')\n",
    "opens  = marble_stream('---o|')\n",
    "d = subs(xs.buffer(buffer_openings=lambda: opens))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== with buffer opening and closing selector ==========\n",
      "\n",
      "\n",
      "   1.0     M New subscription on stream 276476373\n",
      "2530.7 T1741 [next] 2529.6: ['1', '2', '3', '4', '5', '6', '7', '8', '9', '1', '2', '3', '4', '5', '6', '7', '8', '9', '1', '2', '3', '4', '5']\n",
      "2900.7 T1739 [next] 2899.7: ['6', '7', '8', '9']\n",
      "2901.9 T1739 [cmpl] 2900.8: fin\n"
     ]
    }
   ],
   "source": [
    "rst(title='with buffer opening and closing selector')\n",
    "#TODO: behaviour not really understood. Bug?\n",
    "xs =     marble_stream('1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-7-8-9|')\n",
    "opens  = marble_stream('oo---------------------------------------------------|')\n",
    "closes = marble_stream('-------------------------c|')\n",
    "d = subs(xs.buffer(buffer_openings=opens, buffer_closing_selector=lambda: closes))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### ... buffering by counts **[buffer_with_count](http://reactivex.io/documentation/operators/buffer.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function buffer_with_count of module rx.linq.observable.buffer:\n",
      "Projects each element of an observable sequence into zero or more\n",
      "    buffers which are produced based on element count information.\n",
      "\n",
      "    Example:\n",
      "    res = xs.buffer_with_count(10)\n",
      "    res = xs.buffer_with_count(10, 1)\n",
      "\n",
      "    Keyword parameters:\n",
      "    count -- {Number} Length of each buffer.\n",
      "    skip -- {Number} [Optional] Number of elements to skip between creation\n",
      "        of consecutive buffers. If not provided, defaults to the count.\n",
      "\n",
      "    Returns an observable {Observable} sequence of buffers.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.8     M New subscription on stream 276532201\n",
      " 126.4 T1963 [next]  125.4: ['1', '2']\n",
      " 676.4 T1973 [next]  675.4: ['6', '7']\n",
      "1228.0 T1983 [next] 1226.9: ['2', '3']\n",
      "1782.0 T1993 [next] 1781.0: ['7', '8']\n",
      "1902.2 T1997 [cmpl] 1901.2: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.buffer_with_count)\n",
    "xs = marble_stream('1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-7-8-9|')\n",
    "d = subs(xs.buffer_with_count(2, skip=5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### ... and take only the last (by count) **[take_last_buffer](http://reactivex.io/documentation/operators/takelast.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function take_last_buffer of module rx.linq.observable.takelastbuffer:\n",
      "Returns an array with the specified number of contiguous elements\n",
      "    from the end of an observable sequence.\n",
      "\n",
      "    Example:\n",
      "    res = source.take_last(5)\n",
      "\n",
      "    Description:\n",
      "    This operator accumulates a buffer with a length enough to store\n",
      "    elements count elements. Upon completion of the source sequence, this\n",
      "    buffer is drained on the result sequence. This causes the elements to be\n",
      "    delayed.\n",
      "\n",
      "    Keyword arguments:\n",
      "    :param int count: Number of elements to take from the end of the source\n",
      "        sequence.\n",
      "\n",
      "    :returns: An observable sequence containing a single list with the specified \n",
      "    number of elements from the end of the source sequence.\n",
      "    :rtype: Observable\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.2     M New subscription on stream 276495909\n",
      " 468.7 T2029 [next]  467.4: ['4', '5']\n",
      " 469.2 T2029 [cmpl]  468.0: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.take_last_buffer)\n",
    "xs = marble_stream('1-2-3-4-5|')\n",
    "d = subs(xs.take_last_buffer(2))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### ... and take only the first (by time) **[take_with_time](http://reactivex.io/documentation/operators/takelast.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function take_with_time of module rx.linq.observable.takewithtime:\n",
      "Takes elements for the specified duration from the start of the\n",
      "    observable source sequence, using the specified scheduler to run timers.\n",
      "\n",
      "    Example:\n",
      "    res = source.take_with_time(5000,  [optional scheduler])\n",
      "\n",
      "    Description:\n",
      "    This operator accumulates a queue with a length enough to store elements\n",
      "    received during the initial duration window. As more elements are\n",
      "    received, elements older than the specified duration are taken from the\n",
      "    queue and produced on the result sequence. This causes elements to be\n",
      "    delayed with duration.\n",
      "\n",
      "    Keyword arguments:\n",
      "    duration -- {Number} Duration for taking elements from the start of the\n",
      "        sequence.\n",
      "    scheduler -- {Scheduler} Scheduler to run the timer on. If not\n",
      "        specified, defaults to rx.Scheduler.timeout.\n",
      "\n",
      "    Returns {Observable} An observable sequence with the elements taken\n",
      "    during the specified duration from the start of the source sequence.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.2     M New subscription on stream 281756025\n",
      "  14.0 T2046 [next]   12.6: 1\n",
      " 125.7 T2048 [next]  124.3: 2\n",
      " 234.1 T2049 [next]  232.7: 3\n",
      " 317.1 T2045 [cmpl]  315.7: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.take_with_time)\n",
    "xs = marble_stream('1-2-3-4-5|')\n",
    "d = subs(xs.take_with_time(310))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### ... or only the last (by time) **[take_last_with_time](http://reactivex.io/documentation/operators/takelast.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function take_last_with_time of module rx.linq.observable.takelastwithtime:\n",
      "Returns elements within the specified duration from the end of the\n",
      "    observable source sequence, using the specified schedulers to run timers\n",
      "    and to drain the collected elements.\n",
      "\n",
      "    Example:\n",
      "    res = source.take_last_with_time(5000, scheduler)\n",
      "\n",
      "    Description:\n",
      "    This operator accumulates a queue with a length enough to store elements\n",
      "    received during the initial duration window. As more elements are\n",
      "    received, elements older than the specified duration are taken from the\n",
      "    queue and produced on the result sequence. This causes elements to be\n",
      "    delayed with duration.\n",
      "\n",
      "    Keyword arguments:\n",
      "    duration -- {Number} Duration for taking elements from the end of the\n",
      "        sequence.\n",
      "    scheduler -- {Scheduler} [Optional] Scheduler to run the timer on. If\n",
      "        not specified, defaults to rx.Scheduler.timeout.\n",
      "\n",
      "    Returns {Observable} An observable sequence with the elements taken\n",
      "    during the specified duration from the end of the source sequence.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.8     M New subscription on stream 277184037\n",
      " 468.7 T2067 [next]  467.7: 3\n",
      " 469.0 T2067 [next]  468.0: 4\n",
      " 469.3 T2067 [next]  468.3: 5\n",
      " 469.4 T2067 [cmpl]  468.4: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.take_last_with_time)\n",
    "xs = marble_stream('1-2-3-4-5|')\n",
    "d = subs(xs.take_last_with_time(310))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want to split one Observable into multiple Observables **[window](http://reactivex.io/documentation/operators/window.html)**\n",
    "\n",
    "Window is similar to Buffer, but rather than emitting packets of items from the source Observable, it emits Observables, each one of which emits a subset of items from the source Observable and then terminates with an onCompleted notification.\n",
    "\n",
    "Like Buffer, Window has many varieties, each with its own way of subdividing the original Observable into the resulting Observable emissions, each one of which contains a “window” onto the original emitted items. In the terminology of the Window operator, when a window “opens,” this means that a new Observable is emitted and that Observable will begin emitting items emitted by the source Observable. When a window “closes,” this means that the emitted Observable stops emitting items from the source Observable and terminates with an onCompleted notification to its observers.\n",
    "\n",
    "from: http://www.introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html#Window\n",
    "> A major difference we see here is that the Window operators can notify you of values from the source as soon as they are produced. The Buffer operators, on the other hand, must wait until the window closes before the values can be notified as an entire list."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== window with count ==========\n",
      "\n",
      "function window_with_count of module rx.linq.observable.windowwithcount:\n",
      "Projects each element of an observable sequence into zero or more\n",
      "    windows which are produced based on element count information.\n",
      "\n",
      "    1 - xs.window_with_count(10)\n",
      "    2 - xs.window_with_count(10, 1)\n",
      "\n",
      "    count -- Length of each window.\n",
      "    skip -- [Optional] Number of elements to skip between creation of\n",
      "        consecutive windows. If not specified, defaults to the count.\n",
      "\n",
      "    Returns an observable sequence of windows.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.1     M New subscription on stream 277141053\n",
      "   2.0     M starting new window 1\n",
      "\n",
      "   2.4     M New subscription on stream 276132537\n",
      "   3.0     M [next]    1.6: None (outer subscription)\n",
      " 106.7 T1844 [next]  104.0: 0 (window id 1)\n",
      " 212.4 T1845 [next]  209.7: 1 (window id 1)\n",
      " 313.7 T1846 [next]  310.9: 2 (window id 1)\n",
      " 314.0 T1846 [cmpl]  311.3: fin (window id 1)\n",
      " 314.5 T1846 starting new window 2\n",
      "\n",
      " 314.7 T1846 New subscription on stream 276613473\n",
      " 315.3 T1846 [next]  314.0: None (outer subscription)\n",
      " 419.1 T1847 [next]  104.4: 3 (window id 2)\n",
      " 521.0 T1848 [next]  206.3: 4 (window id 2)\n",
      " 625.9 T1849 [next]  311.2: 5 (window id 2)\n",
      " 626.3 T1849 [cmpl]  311.6: fin (window id 2)\n",
      " 626.6 T1849 starting new window 3\n",
      "\n",
      " 627.0 T1849 New subscription on stream 276609545\n",
      " 627.4 T1849 [next]  626.0: None (outer subscription)\n",
      " 730.2 T1850 [next]  103.2: 6 (window id 3)\n",
      " 835.8 T1851 [next]  208.8: 7 (window id 3)\n",
      " 937.1 T1852 [next]  310.1: 8 (window id 3)\n",
      " 937.5 T1852 [cmpl]  310.4: fin (window id 3)\n",
      " 937.7 T1852 starting new window 4\n",
      "\n",
      " 937.9 T1852 New subscription on stream 276550141\n",
      " 938.2 T1852 [next]  936.8: None (outer subscription)\n",
      "1039.1 T1853 [next]  101.3: 9 (window id 4)\n",
      "1039.5 T1853 [cmpl]  101.6: fin (window id 4)\n",
      "1040.3 T1853 [cmpl] 1038.9: fin (outer subscription)\n"
     ]
    }
   ],
   "source": [
    "rst(O.window_with_count, title=\"window with count\")\n",
    "wid = 0 # window id\n",
    "def show_stream(window):\n",
    "    global wid\n",
    "    wid += 1\n",
    "    log('starting new window', wid)\n",
    "    # yes we can subscribe normally, its not buffers but observables:\n",
    "    subs(window, name='window id %s' % wid)\n",
    "    \n",
    "src = O.interval(100).take(10).window_with_count(3).map(lambda window: show_stream(window))\n",
    "\n",
    "d = subs(src, name='outer subscription')\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "> It is left to the reader to explore the other window functions offered by RxPY, working similar to buffer:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== window ==========\n",
      "\n",
      "function window of module rx.linq.observable.window:\n",
      "Projects each element of an observable sequence into zero or more\n",
      "    windows.\n",
      "\n",
      "    Keyword arguments:\n",
      "    :param Observable window_openings: Observable sequence whose elements\n",
      "        denote the creation of windows.\n",
      "    :param types.FunctionType window_closing_selector: [Optional] A function\n",
      "        invoked to define the closing of each produced window. It defines the\n",
      "        boundaries of the produced windows (a window is started when the\n",
      "        previous one is closed, resulting in non-overlapping windows).\n",
      "\n",
      "    :returns: An observable sequence of windows.\n",
      "    :rtype: Observable[Observable]\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "\n",
      "========== window_with_time(self, timespan, timeshift=None, scheduler=None) ==========\n",
      "\n",
      "function window_with_time of module rx.linq.observable.windowwithtime:\n",
      "n.a.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "\n",
      "========== window_with_time_or_count(self, timespan, count, scheduler=None) ==========\n",
      "\n",
      "function window_with_time_or_count of module rx.linq.observable.windowwithtimeorcount:\n",
      "n.a.\n",
      "--------------------------------------------------------------------------------\n"
     ]
    }
   ],
   "source": [
    "rst(O.window, title=\"window\")\n",
    "rst(O.window_with_time, title=\"window_with_time(self, timespan, timeshift=None, scheduler=None)\")\n",
    "rst(O.window_with_time_or_count, title=\"window_with_time_or_count(self, timespan, count, scheduler=None)\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ...so that similar items end up on the same Observable **[group_by](http://reactivex.io/documentation/operators/groupby.html)**\n",
    "\n",
    "The GroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 56,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function group_by of module rx.linq.observable.groupby:\n",
      "Groups the elements of an observable sequence according to a\n",
      "    specified key selector function and comparer and selects the resulting\n",
      "    elements by using a specified function.\n",
      "\n",
      "    1 - observable.group_by(lambda x: x.id)\n",
      "    2 - observable.group_by(lambda x: x.id, lambda x: x.name)\n",
      "    3 - observable.group_by(\n",
      "        lambda x: x.id,\n",
      "        lambda x: x.name,\n",
      "        lambda x: str(x))\n",
      "\n",
      "    Keyword arguments:\n",
      "    key_selector -- A function to extract the key for each element.\n",
      "    element_selector -- [Optional] A function to map each source element to\n",
      "        an element in an observable group.\n",
      "    comparer -- {Function} [Optional] Used to determine whether the objects\n",
      "        are equal.\n",
      "\n",
      "    Returns a sequence of observable groups, each of which corresponds to a\n",
      "    unique key value, containing all elements that share that same key\n",
      "    value.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "Total streams: 6\n",
      "Count 2\n",
      "Count 2\n",
      "Count 2\n",
      "Count 2\n",
      "Count 1\n",
      "Count 1\n"
     ]
    }
   ],
   "source": [
    "rst(O.group_by)\n",
    "keyCode = 'keyCode'\n",
    "codes = [\n",
    "    { keyCode: 38}, #// up\n",
    "    { keyCode: 38}, #// up\n",
    "    { keyCode: 40}, #// down\n",
    "    { keyCode: 40}, #// down\n",
    "    { keyCode: 37}, #// left\n",
    "    { keyCode: 39}, #// right\n",
    "    { keyCode: 37}, #// left\n",
    "    { keyCode: 39}, #// right\n",
    "    { keyCode: 66}, #// b\n",
    "    { keyCode: 65}  #// a\n",
    "]\n",
    "\n",
    "src = O.from_(codes).group_by(\n",
    "    key_selector     = lambda x: x[keyCode], # id of (potentially new) streams\n",
    "    element_selector = lambda x: x[keyCode]  # membership to which stream\n",
    ")\n",
    "# we have now 6 streams\n",
    "src.count().subscribe(lambda total: print ('Total streams:', total))\n",
    "d = src.subscribe(lambda obs: obs.count().subscribe(lambda x: print ('Count', x)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 57,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== group by (with time intervals) ==========\n",
      "\n",
      "function group_by_until of module rx.linq.observable.groupbyuntil:\n",
      "Groups the elements of an observable sequence according to a\n",
      "    specified key selector function. A duration selector function is used\n",
      "    to control the lifetime of groups. When a group expires, it receives\n",
      "    an OnCompleted notification. When a new element with the same key value\n",
      "    as a reclaimed group occurs, the group will be reborn with a new\n",
      "    lifetime request.\n",
      "\n",
      "    1 - observable.group_by_until(\n",
      "            lambda x: x.id,\n",
      "            None,\n",
      "            lambda : Rx.Observable.never()\n",
      "        )\n",
      "    2 - observable.group_by_until(\n",
      "            lambda x: x.id,\n",
      "            lambda x: x.name,\n",
      "            lambda: Rx.Observable.never()\n",
      "        )\n",
      "    3 - observable.group_by_until(\n",
      "            lambda x: x.id,\n",
      "            lambda x: x.name,\n",
      "            lambda:  Rx.Observable.never(),\n",
      "            lambda x: str(x))\n",
      "\n",
      "    Keyword arguments:\n",
      "    key_selector -- A function to extract the key for each element.\n",
      "    duration_selector -- A function to signal the expiration of a group.\n",
      "    comparer -- [Optional] {Function} Used to compare objects. When not\n",
      "        specified, the default comparer is used. Note: this argument will be\n",
      "        ignored in the Python implementation of Rx. Python objects knows,\n",
      "        or should know how to compare themselves.\n",
      "\n",
      "    Returns a sequence of observable groups, each of which corresponds to\n",
      "    a unique key value, containing all elements that share that same key\n",
      "    value. If a group's lifetime expires, a new group with the same key\n",
      "    value can be created once an element with such a key value is\n",
      "    encountered.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "\n",
      "========== grouping interval short ==========\n",
      "\n",
      "Distinct elements within 20ms: 10\n",
      "\n",
      "\n",
      "========== grouping interval medium ==========\n",
      "\n",
      "Distinct elements within 200ms: 8\n",
      "\n",
      "\n",
      "========== grouping interval long ==========\n",
      "\n",
      "Distinct elements within 1000ms: 6\n"
     ]
    }
   ],
   "source": [
    "rst(O.group_by_until, title='group by (with time intervals)')\n",
    "\n",
    "src = marble_stream('-(38)-(38)-(40)-(40)-(37)-(39)-(37)-(39)-(66)-(65)-|')\n",
    "\n",
    "def count(interval):\n",
    "    grouped = src.group_by_until(\n",
    "                key_selector     = lambda x: x,   # id of (potentially new) streams\n",
    "                element_selector = lambda x: x,  # membership to which stream\n",
    "                duration_selector= lambda x: O.timer(interval))\n",
    "\n",
    "    d = grouped.count().subscribe(lambda total: print (\n",
    "            'Distinct elements within %sms: %s' % (interval, total)))\n",
    "\n",
    "\n",
    "header('grouping interval short')\n",
    "# now every event is unique, any older stream is forgotten when it occurs:\n",
    "count(20)\n",
    "sleep(2)\n",
    "header('grouping interval medium')\n",
    "# just enough to detect the directly following doublicates:\n",
    "count(200)\n",
    "sleep(2)\n",
    "header('grouping interval long')\n",
    "count(1000)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want to retrieve a particular item emitted by an Observable:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... the first item emitted **[first](http://reactivex.io/documentation/operators/first.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function first of module rx.linq.observable.first:\n",
      "Returns the first element of an observable sequence that satisfies\n",
      "    the condition in the predicate if present else the first item in the\n",
      "    sequence.\n",
      "\n",
      "    Example:\n",
      "    res = res = source.first()\n",
      "    res = res = source.first(lambda x: x > 3)\n",
      "\n",
      "    Keyword arguments:\n",
      "    predicate -- {Function} [Optional] A predicate function to evaluate for\n",
      "        elements in the source sequence.\n",
      "\n",
      "    Returns {Observable} Sequence containing the first element in the\n",
      "    observable sequence that satisfies the condition in the predicate if\n",
      "    provided, else the first item in the sequence.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.6     M New subscription on stream 277141121\n",
      "   1.4     M [next]    0.6: 1\n",
      "   1.5     M [cmpl]    0.7: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.first)\n",
    "d = subs(O.from_((1, 2, 3, 4)).first(lambda x, i: x < 3))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... the sole item it emitted **[single](http://reactivex.io/documentation/operators/single.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function single of module rx.linq.observable.single:\n",
      "Returns the only element of an observable sequence that satisfies the\n",
      "    condition in the optional predicate, and reports an exception if there\n",
      "    is not exactly one element in the observable sequence.\n",
      "\n",
      "    Example:\n",
      "    res = source.single()\n",
      "    res = source.single(lambda x: x == 42)\n",
      "\n",
      "    Keyword arguments:\n",
      "    predicate -- {Function} [Optional] A predicate function to evaluate for\n",
      "        elements in the source sequence.\n",
      "\n",
      "    Returns {Observable} Sequence containing the single element in the\n",
      "    observable sequence that satisfies the condition in the predicate.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.7     M New subscription on stream 277177261\n",
      "   1.6     M [next]    0.9: 3\n",
      "   1.7     M [cmpl]    1.0: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.single)\n",
    "# you can also match on the index i:\n",
    "d = subs(O.from_((1, 2, 3, 4)).single(lambda x, i: (x, i) == (3, 2)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... the last item emitted before it completed **[last](http://reactivex.io/documentation/operators/last.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "function last of module rx.linq.observable.last:\n",
      "Returns the last element of an observable sequence that satisfies the\n",
      "    condition in the predicate if specified, else the last element.\n",
      "\n",
      "    Example:\n",
      "    res = source.last()\n",
      "    res = source.last(lambda x: x > 3)\n",
      "\n",
      "    Keyword arguments:\n",
      "    predicate -- {Function} [Optional] A predicate function to evaluate for\n",
      "        elements in the source sequence.\n",
      "\n",
      "    Returns {Observable} Sequence containing the last element in the\n",
      "    observable sequence that satisfies the condition in the predicate.\n",
      "    \n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   0.6     M New subscription on stream 276611193\n",
      "   2.1     M [next]    1.5: 2\n",
      "   2.4     M [cmpl]    1.7: fin\n"
     ]
    }
   ],
   "source": [
    "rst(O.last)\n",
    "d = subs(O.from_((1, 2, 3, 4)).last(lambda x: x < 3))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
